学习自扔物线的文章http://gank.io/post/560e15be2dca930e00da1083。

粗略学习了最近越来越火的RxJava,进行一个简单的学习总结,这篇博客只涉及使用,不涉及原理

Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid

使用方式:

compile ‘io.reactivex:rxjava:1.0.14’
compile ‘io.reactivex:rxandroid:1.0.1’

我的相关博文:
RxJava使用示例(一): 实现Rxbus代替eventbus
RxJava实现原理探索

什么是RxJava

RxJava 是一个响应式编程框架,采用观察者设计模式。
而观察者模式在之前的博客中有提到,传送门:http://blog.csdn.net/jonstank2013/article/details/50039879

观察者模式简单的来说,可以理解为以下的内容:

  1. 首先,有一个被观察者,Observable,
  2. 有一个观察者,Observer
  3. 它们之间可以建立一种关系:Subscriber
  4. 当关系建立之后,每当被观察者发生了符合通知观察者的条件时,就会通知观察者,观察者再进行相关的处理。

举个栗子:onClickListener
这个里面View是一个被观察者,onClickListner是一个被观察者,他们通过setOnClicklistener()来建立相互间的一种关系,当View被点击的时候,也就是isClicked状态改变了,所以就会通知onClickListener,onClickListener再进行处理。

为什么要使用RxJava

这部分也可以理解成RxJava的优势或者是它的使用场景是什么。

1. 简洁

对于常规的编码来说,经常会出现一种问题,那就是代码不简洁。随着嵌套的增多,在以后或者是别人来接手的时候,看着比较恼火,有的时候还需要重新理一次当初的编写思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();

而使用RxJava,可以实现链式编码,示例如下:
通过这种链式编码,在逻辑上更容易理解,就算是自己以后再来修改,也会很快的了解当初的思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});

2. 异步

观察者模式使用的一个目的就是:后台处理,前台回调。这是一个异步机制,所以异步对于Rxjava来说是至关重要的,在RxJava中,设定了一个调度器Scheduler。
注:Rxjava默认是在当前线程运行,所以对于一些操作需要使用调度器配置线程。
通过Scheduler,可以指定没一段代码应该运行在什么样的线程。

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

上面的五种方法引用自扔物线的博客,简单的总结一下就是:
Schedulers.immediate()这个方法是在当前线程运行
Schedulers.newThread()是开启一个新线程,然后在里面运行接下来的代码
Schedulers.io() I/O操作,什么读写文件啊,数据库啊,网络信息交互啊,图片啊什么都往这里面扔
Schedulers.computation(),这个就是针对IO最好不处理的计算工作
AndroidSchedulers.mainThread(),这个应该就不用介绍了

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。用上面的button点击事件来说,就是点击button这个事件发生的线程
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。用上面的button点击事件来说,就是onClickListener这个事件发生的线程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer number) {
    Log.d(tag, "number:" + number);
    }
    });

注意:subscribeOn()只能定义一次,而observerOn()方法可以定义多次,也就是可以通过observerOn方法实现线程位置的多重转换

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定

3. 结合 Retrofit, RxBinding,RxBus等

Retrofit是一个著名的网络请求库
RxBinding是结合了Rxjava的一种绑定API,类似与onClickListener等,但是它有个优势,就是扩展性更强,例如常见的,防止button被连续点击两次,可以采用:

1
2
3
RxView.clickEvents(button)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(clickAction);

RxBus: 使用RxJava来实现EventBus,而不再需要使用 Otto 或者 GreenRobot 的 EventBus

使用方法(不涉及原理)

1. 创建Observer或者Subscriber,即观察者或订阅者。
Subscriber对Observer进行了一些扩展,但是基本使用方法是一样的。在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}

@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}

@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};

Subscriber增加的方法:

  1. onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),
    onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用doOnSubscribe() 方法。
  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用
    isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后,
    Observable 会持有 Subscriber
    的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause()
    onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

doOnSubscribe()
与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

1
2
3
4
5
6
7
8
9
10
11
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

2.创建Observable,即被观察者

1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});

基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列

1
2
// 依次发送
Observable observable = Observable.just("Hello", "Hi", "Aloha");

from方法就是将数组中的内容拆分,然后再一个个的发

1
2
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);

3. 订阅

1
2
3
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});

不完整定义的回调

在Rxjava中,有三种方法,分别是onNext,onError,onComplete(),前两个是一个参数的,但是泛型值不同,最后一个是无参的,所以可以通过定义一些不完整的回调来实现自定义onNext等。
Action1是只有一个参数的,Action0是只无参的。对于Action1来说,只需要设置对应的泛型就可以自定义onNext或者onError方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

变换

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

在Rxjava中有两种方法,一种是map,一种是flatMap()。通过他们可以将一个事件的类型进行转换,例如string类型,转换成bitmap再返回等等

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});

直接理解来说,map就是一对一转换。flatMap:多层转换。
也就是当一个对象对应多个属性值,比如说一个人对应多个学科成绩时或者是网络请求的多层级返回,就需要使用flatMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);